#!/usr/bin/perl -w
use strict;
use Getopt::Long;
use Set::Scalar;
use IO::Select;

my %pid;    # keys: machine names. Values: process Pids
my %proc;   # keys: memory address of some IO stream. Values the names of the machines
my %method; # keys: machine addresses. Values: the unique name of the associated method

# Command line options
my $configfile = "Cluster";
my $scp = 'scp';
my $scpoptions = '';
my $maxprocesses = 8; # control the maximum number of processes at any time
my $verbose = 0;

my $result = GetOptions(
                'configfile=s', \$configfile,
                'scpoptions=s', \$scpoptions,
                'program=s',    \$scp,
                'processes=i',  \$maxprocesses,
                'verbose',      \$verbose,
                'xterm',        \$cssh,
                'help',         \&help,
             );

usage('Error. Configuration file not found!') unless -r $configfile;

my %cluster = parse_configfile($configfile);

create_machine_alias(%cluster);

my $sourcefile = shift;
usage("Error. Can't find source file\n") unless defined($sourcefile) && -r $sourcefile;

# Set recursive option for scp if source is a directory
$scpoptions = '-r' if (-d $sourcefile);

my @destination = @ARGV;
usage("Error. Provide a destination target!\n") unless @destination;

my $readset = IO::Select->new();
for (@destination) {
  my ($clusterexp, $path) = split /\s*:\s*/;

  unless (length($clusterexp)) {
    warn "Error. Destination $_ must have a cluster specification. Skipping transfer.";
    next;
  }

  $clusterexp =~ s/([a-zA-Z_]\w*)/$method{$1}()/g;
  my $set = eval $clusterexp;

  unless (defined($set) && ref($set) && $set->isa('Set::Scalar')) {
    warn "Error. Expression $clusterexp has errors. Skipping.\n$@\n";
    next;
  }

  for my $m ($set->members) {
    warn "Executing system command:\n\t$scp $scpoptions $sourcefile $m:$path\n" if $verbose;
    my $pid;
    $pid{$m} = $pid = open(my $p, "$scp $scpoptions $sourcefile $m:$path 2>&1 |");
    warn "Can't execute scp $scpoptions $sourcefile $m:$path", next unless defined($pid);
    $proc{0+$p} = $m;
	  $readset->add($p);
  }
}

my $np = keys %proc; # number of processes
my %output;
my @ready;
my $count = 0;
SYNCHRO: do {
  push @ready, $readset->can_read unless @ready;
  my $handle = shift @ready;

  my $name = $proc{0+$handle};

  unless (defined($name) && $name) {
    warn "Error. Received message from unknown handle\n";
    $name = 'unknown';
  }

  my $partial = '';
  my $numBytesRead;
  $numBytesRead = sysread($handle,  $partial, 65535, length($partial));

  $output{$name} .= $partial;

  if (defined($numBytesRead) && !$numBytesRead) {
    # eof
    if ($verbose) {
      print "$name output:\n";
      $output{$name} =~ s/^/$name:/gm if length($output{$name});
      print "$output{$name}\n";
    }
    $readset->remove($handle);
    $count ++;
    close($handle);
  }
} until ($count == $np);


# Create methods for each defined machine or cluster
sub create_machine_alias {
  my %cluster = @_;

  no strict 'refs';
  for my $m (keys(%cluster)) {
    my $name  = uniquename($m);
    *{__PACKAGE__.'::'.$name} = sub { 
      $cluster{$m} 
     };
    $method{$m} = $name;
  }
}

############################################################
sub parse_configfile {
  my $configfile = shift;
  my %cluster;

  open(my $f, $configfile);
  my @desc = <$f>;
  chomp(@desc);

  for (@desc) {
    next if /^\s*(#.*)?$/;

    my ($cluster, $members) = split /\s*=\s*/;
    #die "Error in configuration file $configfile invalid cluster name $cluster" unless $cluster =~ /^[a-zA-Z_]\w*$/;

    my @members = split /\s+/, $members;

    for my $m (@members) {
      #die "Error in configuration file $configfile invalid name $m" unless $m =~ /^[a-zA-Z_]\w*$/;
      $cluster{$m} = Set::Scalar->new($m) unless exists $cluster{$m};
    }
    $cluster{$cluster} = Set::Scalar->new(@members);
  }

  return %cluster;
}

############################################################
sub usage {
  my $errmsg = shift;

  warn "$errmsg\n";
  warn << "HELPMSG";
Usage:
  parscp sourcefile clusterexp:/tmp/
  parscp -c Machines sourcefile clusterexp:/tmp/           # Specifies the cluster description file
  parscp sourcefile clusterexp1:/tmp/ clusterexp2:/scratch/alu32/

Cluster expressions like:

  parscp file 'beo-europa:/tmp'                           # send to all machines in cluster beo but europa
  parscp -s '-q' gmsh.pl  beowulf+orion:/tmp europa:/tmp/ # union: send to both
  parscp -s '-v' -v gmsh.pl  beowulf+orion:/tmp europa:/tmp/ 
  parscp gmsh.pl  '(beowulf+orion)-orion:/tmp'  # non sense. Justo to see 

are accepted.

Example of cluster file:

beo = beowulf europa orion
be  = beowulf europa
bo  = beowulf orion
eo  =  europa orion
et  = europa etsii
#     europa          etsii
num = 193.145.105.175 193.145.101.246

HELPMSG
  exit(1);
}

############################################################
{
  my $pc = 0;

  sub uniquename {
    my $m = shift;

    $m =~ s/\W/_/g;
    $pc++;
    return "_$pc"."_$m";
  }
}

__END__

=head1 NAME

parscp - Parallel secure copy 

=head1 SYNOPSIS

  # Copy to the union of cluster1 and cluster2
  parscp sourcefile  cluster1+cluster2:/tmp 

  # Copy to the intersection of cluster1 and cluster2
  parscp sourcefile  cluster1*cluster2:/tmp 

  # Copy to the machines in cluster1 that don't belong to cluster2
  parscp sourcefile  cluster1-cluster2:/tmp 

  # A more complicated formula:
  parscp sourcefile  '(cluster1+cluster2)-num:/tmp'

  # Several cluster expressions may appear
  parscp sourcefile  cluster1-cluster2:/tmp  cluster2-cluster1:

  # You need to set a configuration file:
      $ cat Cluster
      cluster1 = machine1 machine2 machine3
      cluster2 = machine2 machine4 machine5
      num = 193.140.101.175 193.140.101.246

